LuCloud 10 MapReduce

LuCloud 10 MapReduce

六月 19, 2019

mp

概述

来源

MapReduce是一个软件框架,可以支持大规模数据集上的并行和分布式计算。设计这个抽象模型的灵感来自Lisp和许多其他函数式语言的Map和Reduce的原语。

MapReduce由谷歌提出,具体论文可见谷歌三大核心技术(二)Google MapReduce中文版

原理

MapReduce编程模型的原理是:利用一个输入key/value pair集合来产生一个输出的key/value pair集合。MapReduce库的用户用两个函数表达这个计算:Map和Reduce。

用户自定义的Map函数接受一个输入的key/value pair值,然后产生一个中间key/value pair值的集合。MapReduce库把所有具有相同中间key值I的中间value值集合在一起后传递给reduce函数。

用户自定义的Reduce函数接受一个中间key的值I和相关的一个value值的集合。Reduce函数合并这些value值,形成一个较小 的value值的集合。一般的,每次Reduce函数调用只产生0或1个输出value值。通常我们通过一个迭代器把中间value值提供给Reduce 函数,这样我们就可以处理无法全部放入内存中的大量的value值的集合。

执行概述

官方描述

通过将Map调用的输入数据自动分割为M个数据片段的集合,Map调用被分布到多台机器上执行。输入的数据片段能够在不同的机器上并行处理。使 用分区函数将Map调用产生的中间key值分成R个不同分区(例如,hash(key) mod R),Reduce调用也被分布到多台机器上执行。分区数量(R)和分区函数由用户来指定。

谷歌三大核心技术(一)The Google File System中文版

​ 图1展示了我们的MapReduce实现中操作的全部流程。当用户调用MapReduce函数时,将发生下面的一系列动作(下面的序号和图1中的序号一一对应):

  1. 用户程序首先调用的MapReduce库将输入文件分成M个数据片度,每个数据片段的大小一般从 16MB到64MB(可以通过可选的参数来控制每个数据片段的大小)。然后用户程序在机群中创建大量的程序副本。 (alex:copies of the program还真难翻译)
  2. 这些程序副本中的有一个特殊的程序–master。副本中其它的程序都是worker程序,由master分配任务。有M个Map任务和R个Reduce任务将被分配,master将一个Map任务或Reduce任务分配给一个空闲的worker。
  3. 被分配了map任务的worker程序读取相关的输入数据片段,从输入的数据片段中解析出key/value pair,然后把key/value pair传递给用户自定义的Map函数,由Map函数生成并输出的中间key/value pair,并缓存在内存中。
  4. 缓存中的key/value pair通过分区函数分成R个区域,之后周期性的写入到本地磁盘上。缓存的key/value pair在本地磁盘上的存储位置将被回传给master,由master负责把这些存储位置再传送给Reduce worker。
  5. 当Reduce worker程序接收到master程序发来的数据存储位置信息后,使用RPC从Map worker所在主机的磁盘上读取这些缓存数据。当Reduce worker读取了所有的中间数据后,通过对key进行排序后使得具有相同key值的数据聚合在一起。由于许多不同的key值会映射到相同的Reduce 任务上,因此必须进行排序。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序。
  6. Reduce worker程序遍历排序后的中间数据,对于每一个唯一的中间key值,Reduce worker程序将这个key值和它相关的中间value值的集合传递给用户自定义的Reduce函数。Reduce函数的输出被追加到所属分区的输出文件。
  7. 当所有的Map和Reduce任务都完成之后,master唤醒用户程序。在这个时候,在用户程序里的对MapReduce调用才返回。

​ 在成功完成任务之后,MapReduce的输出存放在R个输出文件中(对应每个Reduce任务产生一个输出文件,文件名由用户指定)。一般情况 下,用户不需要将这R个输出文件合并成一个文件–他们经常把这些文件作为另外一个MapReduce的输入,或者在另外一个可以处理多个分割文件的分布式 应用中使用。

来源:谷歌三大核心技术(二)Google MapReduce中文版

范式描述

  1. Data partitioning The MapReduce library splits the input data (files), already stored in GFS, into M pieces that also correspond to the number of map tasks.

  2. Computation partitioning This is implicitly handled (in the MapReduce framework) by obliging users to write their programs in the form of the Map and Reduce functions. Therefore, the MapReduce library only generates copies of a user program (e.g., by a fork system call) containing the Map and the Reduce functions, distributes them, and starts them up on a number of available computation engines.

  3. Determining the master and workers The MapReduce architecture is based on a masterworker model. Therefore, one of the copies of the user program becomes the master and the rest become workers. The master picks idle workers, and assigns the map and reduce tasks to them. A map/reduce worker is typically a computation engine such as a cluster node to run map/ reduce tasks by executing Map/Reduce functions. Steps 4–7 describe the map workers.

  4. Reading the input data (data distribution) Each map worker reads its corresponding portion of the input data, namely the input data split, and sends it to its Map function. Although a map worker may run more than one Map function, which means it has been assigned more than one input data split, each worker is usually assigned one input split only.

  5. Map function Each Map function receives the input data split as a set of (key, value) pairs to process and produce the intermediated (key, value) pairs.

  6. Combiner function This is an optional local function within the map worker which applies to intermediate (key, value) pairs. The user can invoke the Combiner function inside the user program. The Combiner function runs the same code written by users for the Reduce function as its functionality is identical to it. The Combiner function merges the local data of each map worker before sending it over the network to effectively reduce its communication costs. As mentioned in our discussion of logical data flow, the MapReduce framework sorts and groups the data before it is processed by the Reduce function. Similarly, the MapReduce framework will also sort and group the local data on each map worker if the user invokes the Combiner function.

  7. Partitioning function As mentioned in our discussion of the MapReduce data flow, the intermediate (key, value) pairs with identical keys are grouped together because all values inside each group should be processed by only one Reduce function to generate the final result. However, in real implementations, since there are M map and R reduce tasks, intermediate (key, value) pairs with the same key might be produced by different map tasks, although they should be grouped and processed together by one Reduce function only. Therefore, the intermediate (key, value) pairs produced by each map worker are partitioned into R regions, equal to the number of reduce tasks, by the Partitioning function to guarantee that all (key, value) pairs with identical keys are stored in the same region. As a result, since reduce worker i reads the data of region i of all map workers, all (key, value) pairs with the same key will be gathered by reduce worker i accordingly (see Figure 6.4). To implement this technique, a Partitioning function could simply be a hash function (e.g., Hash(key) mod R) that forwards the data into particular regions. It is also worth noting that the locations of the buffered data in these R partitions are sent to the master for later forwarding of data to thereduceworkers. Figure 6.5 shows the data flow implementation of all data flow steps. The following are two networking steps:

  8. Synchronization MapReduce applies a simple synchronization policy to coordinate map workers with reduce workers, in which the communication between them starts when all map tasks finish.

  9. Communication Reduce worker i, already notified of the location of region i of all map workers, uses a remote procedure call to read the data from the respective region of all map workers. Since all reduce workers read the data from all map workers, all-to-all communication among all map and reduce workers, which incurs network congestion, occurs in the network. This issue is one of the major bottlenecks in increasing the performance of such systems [50–52].A data transfer module was proposed to schedule data transfers independently [55]. Steps 10 and 11 correspond to the reduce worker domain:

  10. Sorting and Grouping When the process of reading the input data is finalized by a reduce worker, the data is initially buffered in the local disk of the reduce worker. Then the reduce worker groups intermediate (key, value) pairs by sorting the data based on their keys, followed by grouping all occurrences of identical keys. Note that the buffered data is sorted and grouped because the number of unique keys produced by a map worker may be more than R regions in which more than one key exists in each region of a map worker (see Figure 6.4).

  11. Reduce function The reduce worker iterates over the grouped (key, value) pairs, and for each unique key, it sends the key and corresponding values to the Reduce function. Then this function processes its input data and stores the output results in predetermined files in the user’s program.

mpr

来源:Distributed and Cloud Compution

几个问题

Shuffle

Map是映射,负责数据的过滤分法,将原始数据转化为键值对;Reduce是合并,将具有相同key值的value进行处理后再输出新的键值对作为最终结果。为了让Reduce可以并行处理Map的结果,必须对Map的输出进行一定的排序与分割,然后再交给对应的Reduce,而这个将Map输出进行进一步整理并交给Reduce的过程就是Shuffle。

Map和Reduce操作需要我们自己定义相应Map类和Reduce类,以完成我们所需要的化简、合并操作,而shuffle则是系统自动帮我们实现的,了解shuffle的具体流程能帮助我们编写出更加高效的Mapreduce程序。

Shuffle过程包含在Map和Reduce两端,即Map shuffle和Reduce shuffle。

具体见MapReduce shuffle过程详解

数据先存到环形内存缓冲区中,当数据在环形内存缓存区中存储满阈值时,启动spill线程,这时进行HashPartition分区和Key.compareTo()排序操作(如果设置了Combiner则将分区排序后没有溢写到磁盘的数据再合并,最后将合并的数据溢写到磁盘)。

Shuffle一开始就是map阶段做输出操作,一般mapreduce计算的都是海量数据,map输出时候不可能把所有文件都放到内存操作,因此map写入磁盘的过程十分的复杂,更何况map输出时候要对结果进行排序,内存开销是很大的,map在做输出时候会在内存里开启一个环形内存缓冲区,这个缓冲区专门用来输出的,默认大小是100mb,并且在配置文件里为这个缓冲区设定了一个阀值,默认是0.80(这个大小和阀值都是可以在配置文件里进行配置的),同时map还会为输出操作启动一个守护线程,如果缓冲区的内存达到了阀值的80%时候,这个守护线程就会把内容写到磁盘上,这个过程叫spill,另外的20%内存可以继续写入要写进磁盘的数据,写入磁盘和写入内存操作是互不干扰的,如果缓存区被撑满了,那么map就会阻塞写入内存的操作,让写入磁盘操作完成后再继续执行写入内存操作,前面我讲到写入磁盘前会有个排序操作,这个是在写入磁盘操作时候进行,不是在写入内存时候进行的,如果我们定义了combiner函数,那么排序前还会执行combiner操作。

​ 每次spill操作也就是写入磁盘操作时候就会写一个溢出文件,也就是说在做map输出有几次spill就会产生多少个溢出文件,等map输出全部做完后,map会合并这些输出文件。这个过程里还会有一个Partitioner操作,对于这个操作很多人都很迷糊,其实Partitioner操作和map阶段的输入分片(Input split)很像,一个Partitioner对应一个reduce作业,如果我们mapreduce操作只有一个reduce操作,那么Partitioner就只有一个,如果我们有多个reduce操作,那么Partitioner对应的就会有多个,Partitioner因此就是reduce的输入分片,这个程序员可以编程控制,主要是根据实际key和value的值,根据实际业务类型或者为了更好的reduce负载均衡要求进行,这是提高reduce效率的一个关键所在。

来源:MapReduce combiner阶段 与shuffle阶段的区别

容错

因为MapReduce库的设计初衷是使用由成百上千的机器组成的集群来处理超大规模的数据,所以,这个库必须要能很好的处理机器故障。

worker故障master周期性的ping每个worker。如果在一个约定的时间范围内没有收到worker返回的信息,master将把这个 worker标记为失效。所有由这个失效的worker完成的Map任务被重设为初始的空闲状态,之后这些任务就可以被安排给其他的worker。同样 的,worker失效时正在运行的Map或Reduce任务也将被重新置为空闲状态,等待重新调度。

当worker故障时,由于已经完成的Map任务的输出存储在这台机器上,Map任务的输出已不可访问了,因此必须重新执行。而已经完成的Reduce任务的输出存储在全局文件系统上,因此不需要再次执行。

当一个Map任务首先被worker A执行,之后由于worker A失效了又被调度到worker B执行,这个“重新执行”的动作会被通知给所有执行Reduce任务的worker。任何还没有从worker A读取数据的Reduce任务将从worker B读取数据。

MapReduce可以处理大规模worker失效的情况。比如,在一个MapReduce操作执行期间,在正在运行的集群上进行网络维护引起 80台机器在几分钟内不可访问了,MapReduce master只需要简单的再次执行那些不可访问的worker完成的工作,之后继续执行未完成的任务,直到最终完成这个MapReduce操作。

master失败
一个简单的解决办法是让master周期性的将上面描述的数据结构 *(alex注:指3.2节)*的 写入磁盘,即检查点(checkpoint)。如果这个master任务失效了,可以从最后一个检查点(checkpoint)开始启动另一个 master进程。然而,由于只有一个master进程,master失效后再恢复是比较麻烦的,因此我们现在的实现是如果master失效,就中止 MapReduce运算。客户可以检查到这个状态,并且可以根据需要重新执行MapReduce操作。

在失效方面的处理机制*(alex注:原文为”semantics in the presence of failures”)*
当用户提供的Map和Reduce操作是输入确定性函数(即相同的输入产生相同的输出)时,我们的分布式实现在任何情况下的输出都和所有程序没有出现任何错误、顺序的执行产生的输出是一样的。

我们依赖对Map和Reduce任务的输出是原子提交的来完成这个特性。每个工作中的任务把它的输出写到私有的临时文件中。每个Reduce任 务生成一个这样的文件,而每个Map任务则生成R个这样的文件(一个Reduce任务对应一个文件)。当一个Map任务完成的时,worker发送一个包 含R个临时文件名的完成消息给master。如果master从一个已经完成的Map任务再次接收到到一个完成消息,master将忽略这个消息;否 则,master将这R个文件的名字记录在数据结构里。

当Reduce任务完成时,Reduce worker进程以原子的方式把临时文件重命名为最终的输出文件。如果同一个Reduce任务在多台机器上执行,针对同一个最终的输出文件将有多个重命名 操作执行。我们依赖底层文件系统提供的重命名操作的原子性来保证最终的文件系统状态仅仅包含一个Reduce任务产生的数据。

使用MapReduce模型的程序员可以很容易的理解他们程序的行为,因为我们绝大多数的Map和Reduce操作是确定性的,而且存在这样的一个 事实:我们的失效处理机制等价于一个顺序的执行的操作。当Map或/和Reduce操作是不确定性的时候,我们提供虽然较弱但是依然合理的处理机制。当使 用非确定操作的时候,一个Reduce任务R1的输出等价于一个非确定性程序顺序执行产生时的输出。但是,另一个Reduce任务R2的输出也许符合一个 不同的非确定顺序程序执行产生的R2的输出。

考虑Map任务M和Reduce任务R1、R2的情况。我们设定e(Ri)是Ri已经提交的执行过程(有且仅有一个这样的执行过程)。当e(R1)读取了由M一次执行产生的输出,而e(R2)读取了由M的另一次执行产生的输出,导致了较弱的失效处理。

执行次序

reduce会等待所有mapper执行后才执行吗,还是会和mapper一起混合执行?

在同一个job中,reduce会等它所有分配的map执行完后才开始执行,map的输出是reduce的输入,但是reduce可以在map执行占比到某个阀值时就可以开始拷数据了,但真正的执行也得等拉取到所有依赖的map结果后才开始执行

map后排序作用?

sort是用来shuffle的,shuffle就是把key相同的东西弄一起去,其实不一定要sort也能shuffle,但是sort的好处是他可以通过外排降低内存使用量

MR在reduce阶段需要分组,将key相同的放在一起进行规约,为了达到该目的,有两种算法:hashmap和sort,前者太耗内存,而排序通过外排可对任意数据量分组,只要磁盘够大就行。map端排序是为了减轻reduce端排序的压力。在spark中,除了sort的方法,也提供hashmap,用户可配置,毕竟sort开销太大了。

Combiner作用

Mapreduce中的Combiner就是为了避免map任务和reduce任务之间的数据传输而设置的,Hadoop允许用户针对map task的输出指定一个合并函数。即为了减少传输到Reduce中的数据量。它主要是为了削减Mapper的输出从而减少网络带宽和Reducer之上的负载。